-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connector proxy airbyte #425
Conversation
|
||
if let Some(repo_tags) = &self.repo_tags { | ||
for tag in repo_tags { | ||
if tag.starts_with("ghcr.io/estuary/materialize-") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition works for now for published materializations, but I personally use local docker images for testing and also I can imagine other people writing materializations for Flow at one point. Should we also use a Docker label for this and only fallback to this check for backward compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! The intention is that we move to docker labels for all of our published connectors, where those labels carry metadata like whether it's a capture or materialization, and its specific protocol.
We haven't done this yet, though, and this is a short-term hack to keep things working.
Just in case it's useful: you can ask docker to build an image, or locally re-tag an image, to ghcr.io/estuary/materialize-my-thing, even if you have no intention of pushing that up as an image.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jgraettinger I put a TODO here for us to revisit this 👍🏽
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to surface it, here's the other place where we're using this hack, as part of the flowctl api spec
subcommand:
https://github.com/estuary/flow/blob/master/go/flowctl-go/cmd-api-spec.go#L44-L49
Okay, I got to test using
I just had to make some minor adjustments to the Airbyte specification (some optional fields were required by our code, etc.) and a fix for argument ordering when invoking the connectors. Put in a bunch of TODOs for things to improve. I'm still a bit afraid of the potential blast radius of this PR, a few questions:
I will endeavour to write tests for this piece so that we can iterate on it with more confidence. I will do that before refactorings to make sure I don't break things as I refactor some parts of the code. |
@mdibaiee and I connected and are in the process of testing this for Facebook with a customer use case |
For the record, the Airbyte facebook connector is using a deprecated API and as such is currently not working: |
f9ca1a0
to
fb6f62e
Compare
There was a bug in implementation of |
34f680b
to
b0ee86e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is getting pretty close. All but one of my inline comments are things that can be deferred until later.
One other thing I noticed, which I couldn't comment on inline: the examples/examples.db*
files should not be checked in. Looks like they were added by accident.
Apart from checking the usage of BytesMut
in the response stream, I think this is ok to merge.
crates/connector_proxy/src/interceptors/network_tunnel_capture_interceptor.rs
Show resolved
Hide resolved
stop and cont airbyte discover spec / validate / apply capture requests airbyte pull airbyte discover various improvements check runtime protocol check runtime protocol ready signal from stderr makefile more deps try_stream! for error processing update Cargo.lock Makefile change improve airbyte source logic ldconfig XXtesting move test to common stage NsMerge tweaks update Cargo.lock use libc instead of nix skip null field in specresult update control plane snapshots delayed process stop itself setup go in stage 2 tweaks using /var/tmp send SpecRequest synthetic chekcpoint convert_ to adapt_ debug - replacing sigcont with stdin simplify delay waiting logic remove a TODO
// We ignore JSONs that are not Airbyte Messages according | ||
// to the specification: | ||
// https://docs.airbyte.com/understanding-airbyte/airbyte-specification#the-airbyte-protocol | ||
Err(_) => return Ok(None), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upon a closer read, this isn't right, either. The wording of the spec in ambiguous, and makes it seem like you could expect that each line on stdout is JSON, but that is not so. Here's some completely reasonable assumptions about airbyte connectors that turned out to be wrong:
- stdout will only have JSON: Nope, it'll also contain lines of plain text mixed in
- all logs are emitted as airbyte
Message
s: Nope, see above. The logMessage
s seem to be more for things that are meant to be surfaced directly to the end user, as opposed to debug logs. - debug logs are written to stderr: Nope, only some of their connectors do that. Many log only to stdout, or to both.
The behavior we're using currently is sort of the opposite of what's here. Lines are allowed to not parse as JSON, but if they do parse as JSON, then they're required to deserialize sucessfully as airbyte messages. Based on yet another read of the spec, we might also need to permit that second case, but it hasn't caused an issue so far.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the protocol specification once again and my reading still is that each line must be JSON, although it can be a non-AirbyteMessage, in which case it's ignored. https://docs.airbyte.com/understanding-airbyte/airbyte-specification/#the-airbyte-protocol
There may be cases where debug logs are printed to stdout as plaintext (as in this example which I opened a PR for) but I'm pretty sure it's a bug. However, I think Airbyte is being forgiving about this in their own product which leads to them not finding these bugs out. When I asked one of their engineers about this plaintext print statement, he was very surprised and asked me "I wonder how it's working at all".
Now with that said, we might say we do not trust connectors to conform to the specification properly and we would rather be forgiving to avoid issues on our side, and that's reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I updated the code so that we now operate on lines, rather than on chunks of length-capped bytes directly. This has simplified the message handling function a lot, and now allows us to handle plaintext lines. The serde StreamDeserializer could not handle this (i.e. it would never progress if it sees an input such as: I am plaintext\n{"x": 2}
) because it does not operate on lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if there is any consequence to switching to operating over lines. For one thing, I know that each line can be large, does that cause a problem to have a single large Bytes
rather than multiple smaller ones? We used to cap our Bytes
at 4096 bytes. I still don't expect us to go over three times that number in each line, but it's always a possibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any way at the moment to be able to deserialize lines into JSON while handling plaintext without having to somehow run the deserialize on a line, so in the end we will have to feed "one line" to the deserializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so in the end we will have to feed "one line" to the deserializer.
Yeah, I think that's right.
There may be cases where debug logs are printed to stdout as plaintext (airbytehq/airbyte#11977 which I opened a PR for) but I'm pretty sure it's a bug.
Yeah, I'd agree with that, as well as your suggestion that we should also tolerate it, since that's what airbyte does. Basically, I think we have to treat the "spec" as aspirational, and consider the "real spec" to be whatever airbyte accepts. The rationale is that users will not be very understanding if they can't use an airbyte connector in Flow because of a "bug", when that connector seems to work just fine in Airbyte. I do think that pushing for conformance to the published spec is also important, though, and so ultimately I think we need to take it case by case. The stdout logging issue is just one that we'd seen in (anecdotally) many connectors, but I don't really have a good sense of how common it is. Sorry for the ramble, but I think my conclusion here is just that I'd trust your judgement on what to do here now that you're informed on the issue.
a5741a2
to
bf26153
Compare
5993e51
to
e7bf994
Compare
* wip moving away from try_stream! * refactor simple streams into stream::once * more stream refactoring * more refactorings * simplify stream usage with helper functions * more comments * two more small streams
69bccea
to
26c418c
Compare
26c418c
to
8f2727f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That last round of changes looks great. Merge on! 🚀
Description:
(Describe the high level scope of new or changed features)
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)
Notes for reviewers:
(anything that might help someone review this PR)
This change is